🎙️阅读本文需 10 分钟
我们很高兴地宣布 StreamNative 和 OVHcloud 开源了 KoP(Kafka on Pulsar)。KoP 将 Kafka 协议处理插件引入 Pulsar broker。这样一来,Apache Pulsar 就支持原生 Apache Kafka 协议。
将 KoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 Kafka 应用程序和服务迁移到 Pulsar。
这样,Kafka 应用程序就可以使用 Pulsar 的强大功能,例如:
Apache Pulsar 是一个事件流平台。最初,Apache Pulsar 就采用云原生、分层分片的架构。该架构将服务和存储分离开来,使系统实现更友好的容器化。Pulsar 的云原生架构具备强扩展性、高一致性和高弹性,使公司能通过实时数据解决方案扩展业务。自 2016 年开源以来,Pulsar 已得到广泛采用,并于 2018 年成为 Apache 顶级项目。
Plusar 为队列和流工作负载提供统一的消息模型。Pulsar 支持自己基于 protobuf 的二进制协议,以确保高性能和低延迟。protobuf 有利于实现 Pulsar 客户端。而且,该项目也支持 Java,Go,Python 和 C ++ 语言以及社区提供的第三方客户端。但是,对于使用其他消息传输协议编写的应用程序,用户必须重写这些应用程序,否则这些应用程序无法采用 Pulsar 新的统一消息传输协议。
为了解决这一问题,Pulsar 社区开发了一些应用程序,以便将 Kafka 应用程序从其他消息系统迁移到 Pulsar。例如,Pulsar 在 Kafka Java API 上提供了 Kafka wrapper。
Kafka wrapper 允许用户在不改变代码的情况下将其使用的 Kafka Java 客户端应用程序从 Kafka 切换到 Pulsar。Pulsar 还提供丰富的 connector 生态系统,用于连接 Pulsar 和其他数据系统。
但是,那些想要从其他 Kafka 应用程序切换到 Pulsar 的用户仍然有强烈的需求。
StreamNative 和 OVHcloud 合作StreamNative 收到大量的入站请求,请求帮助从其他消息系统迁移到 Pulsar 。同时,StreamNative 也意识到在 Pulsar 上原生支持其他消息传输协议(例如 AMQP 和 Kafka)的必要性。
所以,StreamNative 开始致力于将通用协议处理插件框架引入到 Pulsar 中。该框架允许使用其他消息传输协议的开发人员使用 Pulsar。多年来,OVHcloud 一直采用 Apache Kafka。尽管他们有在 Kafka 上运行多个集群且每秒处理数百万条消息的经验,但仍面临艰巨的运营挑战。例如,如果不使用多租户特性,他们很难将成千上万个用户的数千个 Topic 放在一个集群中。所以,OVHcloud 放弃 Kafka,决定将其主题即服务的产品(即 ioStream)转移到 Pulsar,并在 Pulsar 上构建其产品。与 Kafka 相比,Pulsar 支持多租户特性且其整体架构包含 Apache BookKeep 组件,这有助于简化用户操作。
在初步实验之后,OVHcloud 决定将 KoP 作为 PoC proxy,将 Kafka 协议即时转换到 Pulsar。在此过程中,OVHcloud 注意到 StreamNative 正在致力于将 Kafka 协议原生地引入到 Pulsar。于是,他们联手开发了 KoP。
KoP 旨在利用 Pulsar 和 BookKeeper 的事件流存储架构和 Pulsar 的可插拔协议处理插件框架来提供一种精简而全面的解决方案。KoP 是一个协议名称为「kafka」的协议处理插件。KoP 绑定在 Pulsar broker上,并与 Pulsar broker 一起运行。
关于日志,Pulsar 和 Kafka 都采用非常相似的数据模型,用于发布/订阅消息和事件流。例如,Pulsar 和 Kafka 都采用分布式日志。这两个系统的主要区别在于它们如何实现分布式日志。Kafka 采用分区的架构,将分布式日志(Kafka 分区中的日志)存储在一组 broker 中。Pulsar 采用分片的架构,利用 Apache BookKeeper 作为其横向扩展的分片存储层,将分布式日志存储在 Apache BookKeeper 中。Pulsar 基于分片的架构有助于避免数据搬迁、实现高扩展性、以及持久地存储事件流。Pulsar 和 Kafka 都基于相似的数据模型(分布式日志)进行搭建,而且 Pulsar 采用分布式日志存储和可插拔的协议处理插件框架(在2.5.0 版本中引入),所以 Pulsar 可以很容易地实现兼容 Kafka 的协议处理插件。通过对比 Pulsar 和 Kafka,我们发现这两种系统有很多相似之处。这两种系统都包括以下操作:- Topic 查找:所有客户端都连接到任一 broker 以查找 Topic 的元数据(即 owner broker)。获取元数据之后,客户端与 owner broker 建立持久的 TCP 连接。
- 发布:客户端与 Topic 区的 owner broker 进行对话,以将消息追加到分布式日志中。
- 消费:客户端与 Topic 分区的 owner broker 进行对话,以便从分布式日志中读取消息。
- 偏移量:为发布给 Topic 分区的消息分配偏移量。在 Pulsar 中,偏移量被称为 MessageId。consumer 可以使用偏移量来查找日志中的给定位置,以便读取消息。
- 消费状态:这两个系统都维护订阅中的 consumer( Kafka 称之为消费组)的消费状态。Kafka 将消费状态存储在 `__offsets` Topic,而 Pulsar 将消费状态存储在 `cursors`。
正如你所见,这些都是横向扩展分布式日志存储(例如 Apache BookKeeper)提供的所有原始操作。
Pulsar 的核心功能是在 Apache BookKeeper 上实现的。因此,我们可以非常简单、直接地使用 Pulsar 在 BookKeeper 上开发的现有组件来实现 Kafka 概念。
下图说明了我们如何在 Pulsar 中添加 Kafka 协议支持。我们引入一个新的协议处理插件,该协议处理插件利用 Pulsar 的现有组件(例如 Topic 发现、分布式日志库-ManagedLedger、cursor 等)来实现 Kafka 传输协议。
Kafka 将所有 Topic 存储在扁平的命名空间。但是,Pulsar 将 Topic 存储在层次化、多租户的命名空间。我们在 broker 配置中添加了 `kafkaNamespace` 配置,这样管理员就可以将 Kafka Topic 映射到 Pulsar Topic。为了方便 Kafka 用户使用 Apache Pulsar 的多租户特性,当 Kafka 用户使用 SASL 验证机制来验证 Kafka 客户端的时候,可以指定一个 Pulsar 租户和命名空间作为其 SASL 用户名。
🔎消息 ID 和偏移量
Kafka 为每条被成功发布到 Topic 分区的消息都指定了一个偏移量。Pulsar 为每条消息指定了一个 `MessageID`。消息 ID 由 `ledger-id`、 `entry-id` 和 `batch-index` 组成。我们在 Pulsar-Kafka wrapper 中使用相同的方法将 Pulsar 的消息 ID 转换为偏移量,反之亦然。
Kafka 和 Pulsar 的消息都包含键、值、时间戳和 header(在 Pulsar 中被称作 ‘properties’)。我们自动在 Kafka 消息和 Pulsar 消息之间转换这些字段。
🔎Topic 查找
我们为 Kafka 和 Pulsar 的请求处理插件提供相同的 Topic 查找方法。请求处理插件发现 Topic,查找所请求的 Topic 分区的全部所有权,然后将包含所有权信息的 Kafka `TopicMetadata` 返回给 Kafka 客户端。
🔎发布消息
当收到 Kafka 客户端发布的消息后,Kafka 请求处理插件逐一将多个字段(例如键、值、时间戳和 headers)进行映射,从而将 Kafka 消息转换为 Pulsar 消息。
同时,Kafka 请求处理插件利用 ManagedLedger append API 将这些已转化的 Pulsar 消息存储在 BookKeeper。Kafka 请求处理插件将 Kafka 消息转换为 Pulsar 消息后,现有的 Pulsar 应用程序就可以接收 Kafka 客户端发布的消息。
🔎消费消息
当收到 Kafka 客户端的 consumer 请求时,Kafka 请求处理插件打开一个非持久 cursor,然后从请求的偏移量开始读取 entries。Kafka 请求处理插件将 Pulsar 消息转换回 Kafka 消息后,现有的 Kafka 应用程序就可以接收 Pulsar 客户端发布的消息。🔎Group coordinator & 偏移量管理
最大的挑战是实现 group coordinator 和偏移量管理。Pulsar 不支持集中的 group coordinator,无法为消费组里的 consumer 分配分区,也无法管理每个消费组的偏移量。Pulsar broker 基于分区来管理分区分配,而分区的 owner broker 通过将确认信息存储在 cursors 来管理偏移量。
我们很难让 Pulsar 模型与 Kafka 模型保持一致。因此,为了完全兼容 Kafka 客户端,我们将 coordinator group 的更改和偏移量存储在 Pulsar 名为 `public/kafka/__offsets` 系统 Topic 中,从而实现 Kafka coordinator group。这样,我们能够在 Pulsar 和 Kafka 之间建立桥梁,并允许用户使用现有的 Pulsar 工具和策略来管理订阅并监控 Kafka consumer。我们在已实现的 coordinator group 中添加一个后台线程,定期将偏移量更新从系统 Topic 同步到 Pulsar cursor。因此,实际上 Kafka 消费组被认为是 Pulsar 订阅。所有现有的 Pulsar 工具也可以用于管理 Kafka 消费组。
StreamNative 和 OVHcloud 都重视客户的成功。我们相信,在 Apache Pulsar 上提供原生 Kafka 协议能够帮助采用 Pulsar 的用户更快地取得业务成功。
KoP 整合了两个流行的事件流生态系统,解锁了新的用例。客户可以利用这两个生态系统的优势,借助 Apache Pulsar 构建一个真正统一的事件流平台,加速开发实时应用程序和服务。
KoP 使日志收集器可以继续从其来源收集日志数据,并使用现有的 Kafka 集成向 Apache Pulsar 发布消息。下游应用程序可以使用 Pulsar Functions 来处理到达系统的事件,实现无服务器化事件流传输。KoP 使用 Apache License V2 许可证,项目地址如下。
🔗https://github.com/streamnative/kopStreamNative Platform 已经内置 KoP。你可以选择下载 StreamNative Platform来试用 KoP 的所有功能。
🔗http://streamnative.io/download
如果已经运行 Pulsar 集群,并且希望其支持 Kafka 协议,可以将 KoP 协议处理插件安装到现有的 Pulsar 集群。相关详细信息,请参考如下链接说明
🔗http://streamnative.io/docs/latest/connect-and-ingest/kop/get-started -kop /#configure
如果想要了解有关 KoP 的更多信息,请参考 KoP 的代码和文档信息。
我们期待你提出问题和 PR。你也可以在 Pulsar Slack 中加入 #kop 频道,讨论有关 Kafka-on-Pulsar 的所有事情。
Slack 申请:
https://apache-pulsar.herokuapp.com/同时,StreamNative 和 OVHcloud 将于太平洋时间 3 月 31 日举办有关 KoP 的网络研讨会。如果想要了解更多详细信息,可以扫描下图二维码进行申请。期待与你在网上见面。
最初,StreamNative 发起 KoP 项目。后来,OVHcloud 团队加入了该项目。我们一起合作开发 KoP 项目。非常感谢 OVHcloud 的 Pierre Zemb 和 Steven Le Roux 对这个项目的贡献!想要随时掌握 Pulsar 的研发进展、用户案例和热点话题吗?快来关注 Apache Pulsar 和 StreamNative 微信公众号,我们第一时间在这里分享与 Pulsar 有关的一切。